package com.hvisions.config;

import com.alibaba.fastjson.JSON;
import com.hvisions.server.runner.ApplicationRunnerImpl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.UaClient;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedDataItem;
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedSubscription;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @author ykz
 */
@Component
@Slf4j
public class DeviceOpenConfigProcess implements Runnable {

	@Autowired
    StringRedisTemplate redisTemplate;
	@Autowired
    OpcUaConfig opcUaConfig;
    @Autowired
    Redisson redisson;

    /**
     * 自定义订阅监听
     */
    private class CustomSubscriptionListener implements UaSubscriptionManager.SubscriptionListener {
 
 
        @Override
        public void onKeepAlive(UaSubscription subscription, DateTime publishTime) {
            log.info("opcUa监听：onKeepAlive（连接存活中）");
            //redisson锁 决定哪台服务启进行发送数据
            RLock rLock = redisson.getLock("cjxjy-wareHouse-project");
            if(rLock.tryLock()){
                ApplicationRunnerImpl.send = true;
            }

        }
 
        @Override
        public void onStatusChanged(UaSubscription subscription, StatusCode status) {
            log.info("opcUa监听：onStatusChanged（连接状态改变）");
        }
 
        @Override
        public void onPublishFailure(UaException exception) {
            log.info("opcUa监听：onPublishFailure（连接断开）");
            //连接断开记录
//            opcUaConfig.opcuaRecord();
        }
 
        @Override
        public void onNotificationDataLost(UaSubscription subscription) {
            log.info("opcUa监听：onNotificationDataLost（数据丢失）");
        }
 
        /**
         * 重连时 尝试恢复之前的订阅失败时 会调用此方法
         *
         * @param uaSubscription 订阅
         * @param statusCode     状态
         */
        @Override
        public void onSubscriptionTransferFailed(UaSubscription uaSubscription, StatusCode statusCode) {
            //在回调方法中重新订阅
            log.info("opcUa监听：正在重新启动订阅");
             // 获取OPC UA服务器  --参数true就是是否需要重复连接
//            opcUaConfig.createSubscription(true);
            //重启
            deviceOpenConfig(OpcUaConfig.opcLink);
        }
    }
 
    @Override
    public void run() {
        //再创建好订阅对象之后将监听加到 SubscriptionManager 里   ------>监听
        OpcUaConfig.opcLink.getSubscriptionManager().addSubscriptionListener(new CustomSubscriptionListener());
        deviceOpenConfig(OpcUaConfig.opcLink);
    }
    /**
    *	批量订阅
    */
    public void deviceOpenConfig(UaClient opcClient){
        try {
            //此处用法比较牵强，避免断线重连后一个点位被多个订阅对象订阅
            if(opcClient.getSubscriptionManager().getSubscriptions().size()>0){
                for (int i = 0; i < opcClient.getSubscriptionManager().getSubscriptions().size(); i++) {
                    opcClient.getSubscriptionManager().deleteSubscription(opcClient.getSubscriptionManager().getSubscriptions().get(i).getSubscriptionId());
                }
            }
            //点位多了可能回内存溢出 我这里是从redis中获取到的，当然也可以从数据读取，看自己业务需要怎么处理 ----我这里的订阅参数是动态的
            AtomicReference<List<String>> key = new AtomicReference<>(redisTemplate.opsForList().range("opc:point", 0, -1));
            List<MonitoredItemCreateRequest> requests = new ArrayList<>();
            if (!StringUtils.isEmpty(key)) {
                //创建发布间隔1000ms的订阅对象
                UaSubscription subscription = opcClient.getSubscriptionManager().createSubscription(1000).get();
                for (int i = 0; i < key.get().size(); i++) {
                    String node = key.get().get(i);
                    Integer index = 2;
                    //创建订阅的变量
                    NodeId nodeId = new NodeId(index, node);
                    ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(), null, null);
                    //创建监控的参数
                    MonitoringParameters parameters = new MonitoringParameters(
                            UInteger.valueOf(1 + i),  // 为了保证唯一性，否则key值一致
                            0.0,     // sampling interval
                            null,       // filter, null means use default
                            UInteger.valueOf(10),   // queue size
                            true        // discard oldest
                    );
                    MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
                    requests.add(request);
                }
                //创建监控项，并且注册变量值改变时候的回调函数。
                subscription.createMonitoredItems(
                        TimestampsToReturn.Both,
                        requests,
                        (item, id) -> {
                            item.setValueConsumer((is, value) -> {
                                //todo: 点位多了可能会溢出 检测redis是否发生了变化  ---redis更新可以忽略
                                List<String> lists = redisTemplate.opsForList().range("opc:point",0,-1);
                                //判断key与lists对象是否相同
                                Boolean exist = opcUaConfig.exist(key.get(), lists);
                                if (!exist) {
                                    System.out.println("---------deviceOpenConfig缓存数据发生了变化------------");
                                    key.set(lists);
                                    deviceOpenConfig(OpcUaConfig.opcLink);
                                    return;
                                }
                                String nodeName = item.getReadValueId().getNodeId().getIdentifier().toString();
                                String nodeValue = value.getValue().getValue().toString();
                                log.info("参数:"+nodeName+"-----------值:"+nodeValue);
                                //此处拿到数据根据send值判断是否需要发送 mq
                                if(ApplicationRunnerImpl.send){
                                    //发送数据
                                }else {
                                    log.info("当前opc非master,不允许发送数据");
                                }
                            });
                        });
            } else {
                log.info("暂无采集数据");
                Thread.sleep(3 * 1000);
                //重新执行本方法
                deviceOpenConfig(opcClient);
            }
        } catch (Exception e) {
            log.info("----------参数出现异常----------");
            log.info("123",e);
        }
    }
}